# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import primefac, functools
import sympy as sm
from hysop.tools.numerics import float_to_complex_dtype
from hysop.tools.numpywrappers import npw
from hysop.tools.decorators import debug
from hysop.tools.warning import HysopWarning
from hysop.tools.htypes import first_not_None, to_tuple, check_instance
from hysop.core.graph.graph import op_apply
from hysop.core.memory.memory_request import MemoryRequest
from hysop.fields.continuous_field import Field, ScalarField
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.backend.device.opencl.opencl_fft import OpenClFFT
from hysop.backend.device.opencl.opencl_symbolic import OpenClSymbolic
from hysop.backend.device.codegen.base.variables import dtype_to_ctype
from hysop.symbolic import local_indices_symbols
from hysop.symbolic.relational import Assignment
from hysop.symbolic.complex import ComplexMul
from hysop.symbolic.misc import Select
from hysop.symbolic.field import AppliedSymbolicField, SymbolicField, curl, laplacian
from hysop.symbolic.parameter import SymbolicScalarParameter
from hysop.symbolic.spectral import AppliedSpectralTransform
from hysop.operator.base.external_force import (
ExternalForce,
SpectralExternalForceOperatorBase,
)
[docs]
class OpenClSpectralExternalForce(SpectralExternalForceOperatorBase, OpenClSymbolic):
"""
Operator to compute the curl of a symbolic expression.
"""
@debug
def __new__(cls, Fext, **kwds):
return super().__new__(cls, Fext=Fext, **kwds)
@debug
def __init__(self, Fext, **kwds):
check_instance(Fext, SymbolicExternalForce)
super().__init__(Fext=Fext, **kwds)
[docs]
class SymbolicExternalForce(ExternalForce):
@debug
def __new__(cls, name, Fext, diffusion=None, **kwds):
return super().__new__(cls, name=name, dim=None, Fext=Fext, **kwds)
@debug
def __init__(self, name, Fext, diffusion=None, **kwds):
"""
Specify an external force as a tuple of symbolic expressions.
2D ExternalForce example:
1) Fext = -rho*g*e_y where rho is a field and g a constant
Fext = (0, -rho.s()*g)
2) Fext = (Rs*S+C)*e_y
Fext = (0, -Rs*S.s()+C.s())
"""
Fext = to_tuple(Fext)
dim = len(Fext)
Fext = list(Fext)
for i, e in enumerate(Fext):
if isinstance(e, type(None)):
Fext[i] = 0 # curl(const) = 0
msg = (
'Expression "{}" contains a SymbolicField, did you forget to apply it ?'
)
msg = msg.format(e)
if isinstance(e, sm.Basic):
assert not e.atoms(SymbolicField), msg
Fext = tuple(Fext)
super().__init__(name=name, dim=dim, Fext=Fext, **kwds)
diffusion = first_not_None(diffusion, {})
diffusion = {k: v for (k, v) in diffusion.items() if (v is not None)}
for k, v in diffusion.items():
assert k in self.input_fields(), k.short_description()
assert isinstance(v, ScalarParameter)
self._diffusion = diffusion
[docs]
def output_fields(self):
return set(self._diffusion.keys())
[docs]
def output_params(self):
return set()
[docs]
def initialize(self, op):
tg = op.new_transform_group()
fft_fields = tuple(self.input_fields())
forward_transforms = {}
backward_transforms = {}
for Si in fft_fields:
Fi = tg.require_forward_transform(Si)
forward_transforms[Si] = Fi
if Si in self.diffusion:
Bi = tg.require_backward_transform(Si)
backward_transforms[Si] = Bi
force_backward_transforms = {}
for Fi in op.force.fields:
force_backward_transforms[Fi] = tg.require_backward_transform(
Fi, custom_input_buffer="B0"
)
frame = None
fft_expressions = ()
for e in self.Fext:
if isinstance(e, sm.Basic):
efields = tuple(e.atoms(AppliedSymbolicField))
for sf in efields:
field = sf.field
assert field in forward_transforms, field.name
if field in self._diffusion:
assert field in backward_transforms, field.name
replace = {sf: forward_transforms[sf.field].s for sf in efields}
frame = next(iter(replace.values)).frame
e = e.xreplace(replace)
fft_expressions += (e,)
if frame is None:
msg = "Could not extract frame from expressions."
raise RuntimeError(frame)
fft_expressions = to_tuple(curl(fft_expressions, frame))
self.tg = tg
self.forward_transforms = forward_transforms
self.backward_transforms = backward_transforms
self.force_backward_transforms = force_backward_transforms
self.fft_expressions = fft_expressions
self.compute_required_kernels(op)
[docs]
def compute_required_kernels(self, op):
dts = op.dt.s
forces = op.force.s()
diffusion_kernels = {}
for f, nu in self.diffusion.items():
nus = nu.s
kname = f"filter_diffusion_{f.dim}d_{f.name}"
Ft = self.forward_transforms[f]
Fs = Ft.output_symbolic_array(f"{f.name}_hat")
E = 0
Wn = self.tg.push_expressions(laplacian(Ft.s, Ft.s.frame))
for Wi in Wn:
Wi = self.tg._indexed_wave_numbers[Wi]
E += Wi
expr = Assignment(Fs, Fs / (1 - nus * dts * E))
op.require_symbolic_kernel(kname, expr)
diffusion_kernels[f] = kname
force_kernels = ()
vorticity_kernels = ()
assert (
len(op.vorticity.fields)
== len(op.force.fields)
== len(self.fft_expressions)
)
for Fi, Wi, e in zip(
op.force.fields, op.vorticity.fields, self.fft_expressions
):
if e == 0:
force_kernels += (None,)
vorticity_kernels += (None,)
continue
Fi_hat = self.force_backward_transforms[Fi]
Fi_buf = Fi_hat.input_symbolic_array(f"{Fi.name}_hat")
Wn = self.tg.push_expressions(Assignment(Fi_hat, e))
msg = "Could not extract transforms."
try:
transforms = e.atoms(AppliedSpectralTransform)
except AttributeError:
raise RuntimeError(msg)
assert len(transforms) >= 1, msg
fft_buffers = {
Ft.s: Ft.output_symbolic_array(f"{Ft.field.name}_hat")
for Ft in self.forward_transforms.values()
}
wavenumbers = {Wi: self.tg._indexed_wave_numbers[Wi] for Wi in Wn}
replace = {}
replace.update(fft_buffers)
replace.update(wavenumbers)
expr = e.xreplace(replace)
expr = Assignment(Fi_buf, expr)
kname = f"compute_{Fi.var_name}"
op.require_symbolic_kernel(kname, expr)
force_kernels += (kname,)
Fis = Fi.s()
Wis = Wi.s()
expr = Assignment(Wis, Wis + dts * Fis)
kname = f"update_{Wi.var_name}"
op.require_symbolic_kernel(kname, expr)
vorticity_kernels += (kname,)
assert len(diffusion_kernels) == len(self.diffusion)
assert (
len(force_kernels) == op.vorticity.nb_components == len(vorticity_kernels)
)
self.diffusion_kernel_names = diffusion_kernels
self.force_kernel_names = force_kernels
self.vorticity_kernel_names = vorticity_kernels
[docs]
def discretize(self, op):
pass
[docs]
def get_mem_requests(self, op):
requests = {}
for Fi in self.forward_transforms.keys():
Ft = self.forward_transforms[Fi]
Bt = self.backward_transforms.get(Fi, None)
if Bt is not None:
assert Ft.backend is Bt.backend
assert Ft.output_dtype == Bt.input_dtype, (
Ft.output_dtype,
Bt.input_dtype,
)
assert Ft.output_shape == Bt.input_shape, (
Ft.output_shape,
Bt.input_shape,
)
shape = Ft.output_shape
dtype = Ft.output_dtype
request = MemoryRequest(
backend=self.tg.backend,
dtype=dtype,
shape=shape,
nb_components=1,
alignment=op.min_fft_alignment,
)
name = f"{Ft.field.name}_hat"
requests[name] = request
return requests
[docs]
def pre_setup(self, op, work):
for Fi in self.forward_transforms.keys():
Ft = self.forward_transforms[Fi]
Bt = self.backward_transforms.get(Fi, None)
(dtmp,) = work.get_buffer(op, f"{Ft.field.name}_hat")
Ft.configure_output_buffer(dtmp)
if Bt is not None:
Bt.configure_input_buffer(dtmp)
[docs]
def post_setup(self, op, work):
diffusion_kernels = {}
force_kernels = {}
compute_statistics = {}
vorticity_kernels = {}
ghost_exchangers = {}
queue = self.tg.backend.cl_env.default_queue
def build_launcher(knl, update_params):
def kernel_launcher(knl=knl, update_params=update_params, queue=queue):
kwds = update_params()
return knl(queue=queue, **kwds)
return kernel_launcher
for field, kname in self.diffusion_kernel_names.items():
dfield = op.get_input_discrete_field(field)
knl, update_params = op.symbolic_kernels[kname]
diffusion_kernels[field] = build_launcher(knl, update_params)
ghost_exchangers[field] = functools.partial(
dfield.build_ghost_exchanger(), queue=queue
)
if op.Fmin is not None:
min_values = npw.asarray(op.Fmin()).copy()
if op.Fmax is not None:
max_values = npw.asarray(op.Fmax()).copy()
for i, (kname0, kname1) in enumerate(
zip(self.force_kernel_names, self.vorticity_kernel_names)
):
if kname0 is None:
assert kname1 is None
continue
Wi = op.vorticity.fields[i]
Fi = op.force.fields[i]
dWi = op.dW.dfields[i]
dFi = op.dF.dfields[i]
knl, update_params = op.symbolic_kernels[kname0]
force_kernels[(Fi, Wi)] = build_launcher(knl, update_params)
knl, update_params = op.symbolic_kernels[kname1]
vorticity_kernels[(Fi, Wi)] = build_launcher(knl, update_params)
ghost_exchangers[Wi] = functools.partial(
dWi.build_ghost_exchanger(), queue=queue
)
def compute_statistic(
op=op,
queue=queue,
dFi=dFi,
min_values=min_values,
max_values=max_values,
):
if op.Fmin is not None:
min_values[i] = dFi.sdata.min(queue=queue).get()
if op.Fmax is not None:
max_values[i] = dFi.sdata.max(queue=queue).get()
compute_statistics[Fi] = compute_statistic
def update_statistics(op=op, min_values=min_values, max_values=max_values):
if op.Fmin is not None:
op.Fmin.value = min_values
if op.Fmax is not None:
op.Fmax.value = max_values
if op.Finf is not None:
op.Finf.value = npw.maximum(npw.abs(min_values), npw.abs(max_values))
assert (
len(diffusion_kernels)
== len(self.diffusion)
== len(self.backward_transforms)
)
assert (
len(vorticity_kernels)
== len(force_kernels)
== len(self.force_backward_transforms)
)
assert len(ghost_exchangers) == len(diffusion_kernels) + len(vorticity_kernels)
self.diffusion_kernels = diffusion_kernels
self.force_kernels = force_kernels
self.vorticity_kernels = vorticity_kernels
self.ghost_exchangers = ghost_exchangers
self.compute_statistics = compute_statistics
self.update_statistics = update_statistics
@op_apply
def apply(self, op, **kwds):
for field, Ft in self.forward_transforms.items():
evt = Ft()
if field in self.backward_transforms:
evt = self.diffusion_kernels[field]()
evt = self.backward_transforms[field]()
evt = self.ghost_exchangers[field]()
for Fi, Wi in self.force_kernels.keys():
evt = self.force_kernels[(Fi, Wi)]()
evt = self.force_backward_transforms[Fi]()
evt = self.compute_statistics[Fi]()
evt = self.vorticity_kernels[(Fi, Wi)]()
evt = self.ghost_exchangers[Wi]()
self.update_statistics()
def _extract_objects(self, obj_type):
objs = set()
for e in self.Fext:
try:
objs.update(e.atoms(obj_type))
except AttributeError:
pass
return objs
[docs]
def short_description(self):
return f"SymbolicExternalForce[name={self.name}]"
[docs]
def long_description(self):
sep = "\n *"
expressions = sep + sep.join(f"F{x} = {e}" for (x, e) in zip("xyz", self.Fext))
diffusion = sep + sep.join(
f"{f.pretty_name}: {p.pretty_name}" for (f, p) in self.diffusion.items()
)
input_fields = ", ".join(f.pretty_name for f in self.input_fields())
output_fields = ", ".join(f.pretty_name for f in self.output_fields())
input_params = ", ".join(p.pretty_name for p in self.input_params())
output_params = ", ".join(p.pretty_name for p in self.output_params())
ss = """SymbolicExternalForce:
name: {}
pretty_name: {}
expressions: {}
diffusion: {}
-----------------
input_fields: {}
output_fields: {}
input_params: {}
output_params: {}
""".format(
self.name,
self.pretty_name,
expressions,
diffusion,
input_fields,
output_fields,
input_params,
output_params,
)
return ss